package com.atguigu.day10;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSQL11_ProcessTime_DDL {

    public static void main(String[] args) {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.使用DDL的方式加载数据,同时指定处理时间
        tableEnv.executeSql("create table sensor(id string,ts bigint,vc double,pt AS PROCTIME())" +
                "with (" +
                "'connector.type' = 'kafka'," +
                "'connector.version' = 'universal'," +
                "'connector.topic' = 'test'," +
                "'connector.properties.bootstrap.servers' = 'hadoop102:9092'," +
                "'connector.properties.group.id' = 'bigdata1109'," +
                "'format.type' = 'json'"
                + ")");

        //3.注册表打印表信息
        tableEnv.sqlQuery("select * from sensor")
                .printSchema();

    }

}
